RDD 常用算子¶
Note
从是否触发计算来说,RDD 算子可分为 Transformations 类算子和 Actions 类算子。
从算子用途来说,RDD 有数据转换、数据聚合、数据持久化等类型的算子。
算子类型 |
适用范围 |
算子用途 |
算子集合 |
---|---|---|---|
Transformations |
任意RDD |
RDD内数据转换 |
map |
Paired RDD |
RDD内数据聚合 |
groupByKey |
|
任意RDD |
数据整合 |
union |
|
任意RDD |
重分布 |
coalesce |
|
Actions |
任意RDD |
数据收集 |
collect |
任意RDD |
数据持久化 |
saveAsTextFile |
RDD内数据转换¶
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("rdd operator").setMaster("local")
sc = SparkContext(conf=conf)
# 第一节的代码
lineRDD = sc.textFile("../data/wikiOfSpark.txt")
# flatMap: 先从元素到集合、再从集合到元素
wordRDD = lineRDD.flatMap(lambda line: line.split(" "))
# 调用 filter(f),其作用是保留 RDD 中 f 返回 True 的数据元素,过滤其它元素。
cleanWordRDD = wordRDD.filter(lambda word: word != "")
map¶
给定映射函数 f,map(f) 以元素为粒度对 RDD 做数据转换。
f 可以是带名函数也可以是匿名函数,效果都是一样的。
def f(word):
return (word, 1)
# 带名函数
kvRDD = cleanWordRDD.map(f)
# 匿名函数,效果同上
kvRDD = cleanWordRDD.map(lambda word: (word, 1))
mapPartitions¶
以数据分区为粒度,使用映射函数 f 对 RDD 进行数据转换。
可以使用 mapPartitions 来改善执行性能。
from hashlib import md5
def f(partition):
"""在处理每一条数据记录的时候,可用复用同一个Partition内的md5对象"""
m = md5()
for word in partition:
m.update(word.encode())
yield m.hexdigest()
# 先加密 word,再做转化
kvMD5RDD = cleanWordRDD.mapPartitions(f).map(lambda word: (word, 1))
print(kvMD5RDD.take(2))
[('e9713ae04a02a810d6f33dd956f42794', 1), ('032d2d5e07dd65f436bf59e8135822d2', 1)]
RDD内数据聚合¶
接下来要介绍5个聚合算子,在它们计算的过程中,都会引入 Shuffle。
groupByKey¶
groupByKey 的功能是对 Key 值相同的元素做分组,然后把相应的 Value 值,以集合
的形式收集到一起。
# Key 和 Value 都变为单词
kvSameRDD = cleanWordRDD.map(lambda word: (word, word))
# [(Spark, (Spark, Spark, Spark)), (Streaming, (Streaming, Streaming))] 这样的数据
words = kvSameRDD.groupByKey()
reduceByKey¶
分组聚合
# 随机的value
kvRandomRDD = cleanWordRDD.map(lambda word: (word, random.randint(0, 100)))
# 聚合函数: 同组内最大的value
wordCounts = kvRandomRDD.reduceByKey(lambda x, y: max(x, y))
aggregateByKey¶
reduceByKey: Map 端聚合函数和 Reduce 端聚合函数都一样。
aggregateByKey: 分别指定 Map 端聚合函数和 Reduce 端聚合函数。
def f1(x, y):
# 定义 Map 阶段聚合函数
return x + y
def f2(x, y):
# 定义 Reduce 阶段聚合函数
return max(x, y)
# 初始值(需与f2结果类型保持一致),Map 函数,Reduce 函数
wordCounts = kvRDD.aggregateByKey(0, f1, f2)
sortByKey 和 sortBy¶
以 Key 为准对 RDD 做排序。
print(wordCounts.sortByKey(ascending=False).take(5))
[('|conference=', 2), ('|', 3), ('your', 1), ('you', 3), ('years', 1)]
print(wordCounts.sortBy(lambda x: x[1], ascending=False).take(5))
[('the', 67), ('Spark', 63), ('a', 54), ('and', 51), ('of', 50)]
数据整合¶
words1 = ["Spark", "is", "cool"]
words2 = ["what", "is", "Apache"]
rdd1 = sc.parallelize(words1)
rdd2 = sc.parallelize(words2)
# union: 合并两个同类型RDD
rdd = rdd1.union(rdd2)
rdd = sc.parallelize(list(range(100)))
# sample: 对RDD做随机采样
# 采样是否有放回,采样比例,随机数种子(可选)
print(rdd.sample(False, 0.1, 123).take(10))
[5, 7, 9, 13, 16, 39, 48, 53, 55, 71]
重分布¶
# 查看分区数量
print(rdd.getNumPartitions())
# repartition: 调整RDD的并行度(即RDD的数据分区数量),可增可降
rdd1 = rdd.repartition(20)
print(rdd1.getNumPartitions())
# coalesce: 降低RDD的并行度,不会触发shuffle
rdd2 = rdd1.coalesce(5)
print(rdd2.getNumPartitions())
1
20
5
数据收集¶
take 我们已经很熟悉了,即收集数个元素。
collect: 收集所有元素。
first: 收集所有元素。
print(rdd.collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
print(rdd.first())
0
数据持久化¶
import os
path = "../data/persist"
if not os.path.exists(path=path):
cleanWordRDD.saveAsTextFile(path=path)